From: Jeroen van der Heijden Date: Thu, 4 Oct 2018 20:58:08 +0000 (+0200) Subject: Config buffer X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~8^2~39 X-Git-Url: https://dgit.raspbian.org/%22http://www.example.com/cgi/%22/%22http:/www.example.com/cgi/%22?a=commitdiff_plain;h=631f55a7a3c08f11f33d3f3efa7ece71bbcd3248;p=siridb-server.git Config buffer --- diff --git a/ChangeLog-2.0.30 b/ChangeLog-2.0.30 index 0f6b6d11..dda9f7e3 100644 --- a/ChangeLog-2.0.30 +++ b/ChangeLog-2.0.30 @@ -10,4 +10,6 @@ * Use posix_fadvise() on the buffer file. (@Svedrin) - * Refactor buffer and cleanup alternative buffer path. \ No newline at end of file + * The buffer size can now be adjusted by using the database.conf + configuration file. + \ No newline at end of file diff --git a/include/siri/db/buffer.h b/include/siri/db/buffer.h index db2b56dd..7f037d74 100644 --- a/include/siri/db/buffer.h +++ b/include/siri/db/buffer.h @@ -34,9 +34,11 @@ int siridb_buffer_test_path(siridb_t * siridb); int siridb_buffer_write_empty( siridb_buffer_t * buffer, siridb_series_t * series); -int siridb_buffer_write_last_point( +int siridb_buffer_write_point( siridb_buffer_t * buffer, - siridb_series_t * series); + siridb_series_t * series, + uint64_t * ts, + qp_via_t * val); struct siridb_buffer_s { diff --git a/include/siri/db/series.h b/include/siri/db/series.h index 64432c96..f75b1d86 100644 --- a/include/siri/db/series.h +++ b/include/siri/db/series.h @@ -116,6 +116,7 @@ siridb_points_t * siridb_series_get_first( siridb_points_t * siridb_series_get_last( siridb_series_t * series, int * required_shard); siridb_points_t * siridb_series_get_count(siridb_series_t * series); +void siridb_series_ensure_type(siridb_series_t * series, qp_obj_t * qp_obj); /* * Increment the series reference counter. */ diff --git a/itest/run_all.py b/itest/run_all.py index 5778f982..9ea0bfa7 100644 --- a/itest/run_all.py +++ b/itest/run_all.py @@ -14,6 +14,7 @@ from test_compression import TestCompression from test_log import TestLog from test_log import TestLog from test_pipe_support import TestPipeSupport +from test_buffer import TestBuffer Server.BUILDTYPE = 'Release' @@ -31,3 +32,4 @@ if __name__ == '__main__': run_test(TestUser()) run_test(TestLog()) run_test(TestPipeSupport()) + run_test(TestBuffer()) diff --git a/itest/siridb-random-data.py b/itest/siridb-random-data.py index 53e29e3a..f4334d12 100755 --- a/itest/siridb-random-data.py +++ b/itest/siridb-random-data.py @@ -77,7 +77,7 @@ class Series: _interval_range = None _r = None - def __init__(self, r, allowed_kinds=(int, float, str)): + def __init__(self, r, allowed_kinds=(int, float, str), wrong_type=False): self.kind = r.choice(allowed_kinds) self.lval = { str: '', @@ -96,13 +96,13 @@ class Series: self.likely_equal = r.choice([0.01, 0.1, 0.2, 0.5, 0.99]) self.likely_change_sign = r.choice([0.0, 0.1, 0.25, 0.5, 0.9]) - self.as_int = self.kind == float and r.random() > 0.9 + self.as_int = wrong_type and self.kind == float and r.random() > 0.9 self.likely_inf = r.random() * 0.2 \ if self.kind == float and r.random() > 0.95 else False self.likely_nan = r.random() * 0.2 \ if self.kind == float and r.random() > 0.95 else False - self.gen_float = self.kind == int and r.random() > 0.97 + self.gen_float = wrong_type and self.kind == int and r.random() > 0.97 self.name = self._gen_name() Series._series.append(self) @@ -166,7 +166,10 @@ class Series: kinds = [translate[k] for k in args.kinds] for i in range(args.num_series): - Series(r=series_rand, allowed_kinds=kinds) + Series( + r=series_rand, + allowed_kinds=kinds, + wrong_type=args.wrong_type) def _gen_name(self): name = '/n:{}/range:{},{}/eq:{}/cs:{}/opt:{}{}{}{}{}'.format( @@ -409,6 +412,11 @@ if __name__ == '__main__': default=('int', 'float'), choices=('int', 'float')) # , 'str' + parser.add_argument( + '--wrong-type', + action='store_true', + help='Allow series to insert points using a wrong type') + parser.add_argument( '--max-parallel', type=int, diff --git a/itest/test_buffer.py b/itest/test_buffer.py new file mode 100644 index 00000000..28a8e84c --- /dev/null +++ b/itest/test_buffer.py @@ -0,0 +1,123 @@ +import os +import asyncio +import functools +import random +import time +from testing import Client +from testing import default_test_setup +from testing import gen_data +from testing import gen_points +from testing import gen_series +from testing import InsertError +from testing import PoolError +from testing import QueryError +from testing import run_test +from testing import Series +from testing import Server +from testing import ServerError +from testing import SiriDB +from testing import TestBase +from testing import UserAuthError + + +class TestBuffer(TestBase): + title = 'Test buffer object' + + async def _add_points(self): + for series_name in ['iris', 'db', 'ligo', 'sasha']: + if series_name not in self.total: + self.total[series_name] = [] + batches = sum([ord(c) for c in series_name]) % 100 + for i in range(batches): + npoints = [] + n = int(i**0.5 * 10000 % 5) + 1 + for p in range(n): + self.ts += (n + 5000) if i % 2 else (n - 5000) + npoints.append([self.ts, i*1000+p]) + self.total[series_name].extend(npoints) + self.total[series_name].sort() + await self.client0.insert({series_name: npoints}) + + async def _test_equal(self): + for series_name, points in self.total.items(): + res = await self.client0.query(f'select * from "{series_name}"') + res = res[series_name] + self.assertEqual(len(points), len(res)) + self.assertEqual(points, res) + + async def _change_buf_size(self, buffer_size): + self.client0.close() + result = await self.server0.stop() + self.assertTrue(result) + self.server0.set_buffer_size(self.db, buffer_size) + await self.server0.start(sleep=5) + await self.client0.connect() + res = await self.client0.query('show buffer_size') + self.assertEqual(res['data'][0]['value'], buffer_size) + await self._test_equal() + + async def _change_buf_path(self, buffer_path): + self.client0.close() + result = await self.server0.stop() + self.assertTrue(result) + self.server0.set_buffer_path(self.db, buffer_path) + await self.server0.start(sleep=5) + await self.client0.connect() + res = await self.client0.query('show buffer_path') + self.assertEqual(res['data'][0]['value'], buffer_path) + res = await self.client0.query('show open_files') + self.assertEqual(res['data'][0]['value'], 3) + res = await self.client0.query( + f'alter server {self.uuid} set backup_mode true') + await asyncio.sleep(5) + res = await self.client0.query('show open_files') + self.assertEqual(res['data'][0]['value'], 0) + res = await self.client0.query( + f'alter server {self.uuid} set backup_mode false') + await self._test_equal() + + @default_test_setup(1, time_precision='s', compression=False) + async def run(self): + await self.client0.connect() + + res = await self.client0.query('show uuid') + self.uuid = res['data'][0]['value'] + + self.ts = 1500000000 + self.total = {} + + await self._add_points() + await self._test_equal() + + await self._change_buf_path(os.path.join( + self.server0.dbpath, + self.db.dbname, + '../buf/')) + + await self._change_buf_size(8192) + + await self._add_points() + await self._test_equal() + + await self._change_buf_size(8192) + await self._change_buf_size(512) + + await self._add_points() + await self._test_equal() + + await self._change_buf_size(1024) + + await self._change_buf_path(os.path.join( + self.server0.dbpath, + self.db.dbname, + 'buf/')) + + return False + + +if __name__ == '__main__': + SiriDB.LOG_LEVEL = 'INFO' + Server.HOLD_TERM = True + Server.MEM_CHECK = True + Server.BUILDTYPE = 'Debug' + run_test(TestBuffer()) diff --git a/itest/test_select.py b/itest/test_select.py index 4031e4e0..1a9f5c61 100644 --- a/itest/test_select.py +++ b/itest/test_select.py @@ -87,7 +87,7 @@ DATA = { class TestSelect(TestBase): title = 'Test select and aggregate functions' - @default_test_setup(1, compression=False) + @default_test_setup(1, compression=False, buffer_size=1024) async def run(self): await self.client0.connect() diff --git a/itest/test_series.py b/itest/test_series.py index 37b35945..4ff58502 100644 --- a/itest/test_series.py +++ b/itest/test_series.py @@ -23,6 +23,48 @@ PI = 'ԉ' Klingon = '     ' + \ 'qajunpaQHeylIjmo’ batlh DuSuvqang charghwI’ ‘It.' +data = { + 'string': [ + [1538660000, "some string value"], + [1538660010, -123456789], + [1538660020, -0.5], + [1538660030, 1/3], + ], + 'integer': [ + [1538660000, 1], + [1538660010, 35.6], + [1538660020, "-50,6%"], + [1538660030, ""], + ], + 'double': [ + [1538660000, 1.0], + [1538660010, -35], + [1538660010, "-50,6%"], + [1538660030, ""], + ] +} + +expected = { + 'string': [ + [1538660000, "some string value"], + [1538660010, '-123456789'], + [1538660020, '-0,500000'], + [1538660030, '0,333333'], + ], + 'integer': [ + [1538660000, 1], + [1538660010, 35], + [1538660020, -50], + [1538660030, 0], + ], + 'double': [ + [1538660000, 1.0], + [1538660010, -35.0], + [1538660010, -50.6], + [1538660030, 0.0], + ] +} + class TestSeries(TestBase): title = 'Test series object' @@ -47,6 +89,15 @@ class TestSeries(TestBase): await self.client0.query('select * from "{}"'.format(Klingon)), {Klingon: sorted(points)}) + self.assertEqual( + await self.client0.insert(data), + {'success_msg': 'Successfully inserted 12 point(s).'}) + + self.assertAlmostEqual( + await self.client0.query( + 'select * from "string", "integer", "double"'), + expected) + self.client0.close() # return False diff --git a/itest/testing/server.py b/itest/testing/server.py index c5d3b508..a629449e 100644 --- a/itest/testing/server.py +++ b/itest/testing/server.py @@ -59,6 +59,8 @@ class Server: self.dbpath = os.path.join(TEST_DIR, 'dbpath{}'.format(self.n)) self.name = 'SiriDB:{}'.format(self.listen_backend_port) self.pid = None + self.buffer_path = None + self.buffer_size = None @property def addr(self): @@ -164,6 +166,35 @@ class Server: self.pid = None return True + def set_buffer_size(self, db, buffer_size): + self.buffer_size = buffer_size + config = configparser.RawConfigParser() + config.add_section('buffer') + if self.buffer_path is not None: + config.set('buffer', 'path', self.buffer_path) + config.set('buffer', 'size', self.buffer_size) + with open(os.path.join( + self.dbpath, db.dbname, 'database.conf'), 'w') as f: + config.write(f) + + def set_buffer_path(self, db, buffer_path): + assert(buffer_path.endswith('/')) + curfile = os.path.join(self.dbpath, db.dbname, 'buffer.dat') \ + if self.buffer_path is None else \ + os.path.join(self.buffer_path, 'buffer.dat') + if not os.path.exists(buffer_path): + os.makedirs(buffer_path) + os.rename(curfile, os.path.join(buffer_path, 'buffer.dat')) + self.buffer_path = buffer_path + config = configparser.RawConfigParser() + config.add_section('buffer') + config.set('buffer', 'path', self.buffer_path) + if self.buffer_size is not None: + config.set('buffer', 'size', self.buffer_size) + with open(os.path.join( + self.dbpath, db.dbname, 'database.conf'), 'w') as f: + config.write(f) + def kill(self): print("!!!!!!!!!!!! KILLL !!!!!!!!!!") os.system('kill -9 {}'.format(self.pid)) diff --git a/itest/testing/testbase.py b/itest/testing/testbase.py index 9efcbaf5..d9a7204a 100644 --- a/itest/testing/testbase.py +++ b/itest/testing/testbase.py @@ -116,7 +116,9 @@ class TestBase(unittest.TestCase): assert isinstance(point, list) and len(point) == 2, \ 'Expecting a point to be a list of 2 items' super().assertEqual(a[series][i][0], point[0]) - if math.isnan(a[series][i][1]): + if isinstance(a[series][i][1], str): + super().assertEqual(a[series][i][1], point[1]) + elif math.isnan(a[series][i][1]): assert math.isnan(point[1]), \ 'Expecting point `{}` to be `nan`, got: `{}`' \ .format(i, point[1]) diff --git a/src/siri/db/buffer.c b/src/siri/db/buffer.c index 2cf61269..dc860bc3 100644 --- a/src/siri/db/buffer.c +++ b/src/siri/db/buffer.c @@ -152,20 +152,19 @@ int siridb_buffer_write_empty( * * Returns 0 if success or EOF in case of an error. */ -int siridb_buffer_write_last_point( +int siridb_buffer_write_point( siridb_buffer_t * buffer, - siridb_series_t * series) + siridb_series_t * series, + uint64_t * ts, + qp_via_t * val) { - siridb_point_t * point; const size_t sz = sizeof(uint64_t) + sizeof(qp_via_t); char buf[sz]; - int last_idx = series->buffer->len - 1; - assert (last_idx >= 0); - point = series->buffer->data + last_idx; - - memcpy(buf, &point->ts, sizeof(uint64_t)); - memcpy(buf + sizeof(uint64_t), &point->val, sizeof(qp_via_t)); + ssize_t last_idx = series->buffer->len - 1; + assert (last_idx >= 0); + memcpy(buf, ts, sizeof(uint64_t)); + memcpy(buf + sizeof(uint64_t), val, sizeof(qp_via_t)); return ( /* jump to position where to write the new point */ diff --git a/src/siri/db/insert.c b/src/siri/db/insert.c index 6ca4b23f..cd1f6b14 100644 --- a/src/siri/db/insert.c +++ b/src/siri/db/insert.c @@ -553,6 +553,7 @@ static int8_t INSERT_local_work( ts = (uint64_t *) &qp_series_ts.via.int64; SERIES_UPDATE_TS(series) + siridb_series_ensure_type(series, &qp_series_val); if ((tp = qp_next(unpacker, qp_series_name)) != QP_ARRAY2 && series->buffer != NULL) { @@ -584,10 +585,7 @@ static int8_t INSERT_local_work( if (series->tp == TP_STRING) { val = &forstr; - val->str = qp_is_raw(qp_series_val.tp) ? - strndup(qp_series_val.via.str, qp_series_val.len) : - strdup(""); - + val->str = strndup(qp_series_val.via.str, qp_series_val.len); if (val->str == NULL) { ERR_ALLOC @@ -606,13 +604,12 @@ static int8_t INSERT_local_work( { qp_next(unpacker, &qp_series_ts); /* ts */ qp_next(unpacker, &qp_series_val); /* val */ + siridb_series_ensure_type(series, &qp_series_val); if (series->tp == TP_STRING) { - val->str = qp_is_raw(qp_series_val.tp) ? - strndup(qp_series_val.via.str, qp_series_val.len) : - strdup(""); - + val->str = \ + strndup(qp_series_val.via.str, qp_series_val.len); if (val->str == NULL) { ERR_ALLOC @@ -772,6 +769,8 @@ static int INSERT_local_work_test( ts = (uint64_t *) &qp_series_ts.via.int64; SERIES_UPDATE_TS(series) + siridb_series_ensure_type(series, &qp_series_val); + if ((tp = qp_next(unpacker, qp_series_name)) != QP_ARRAY2 && series->buffer != NULL) { @@ -803,10 +802,7 @@ static int INSERT_local_work_test( if (series->tp == TP_STRING) { val = &forstr; - val->str = qp_is_raw(qp_series_val.tp) ? - strndup(qp_series_val.via.str, qp_series_val.len) : - strdup(""); - + val->str = strndup(qp_series_val.via.str, qp_series_val.len); if (val->str == NULL) { ERR_ALLOC @@ -824,13 +820,12 @@ static int INSERT_local_work_test( { qp_next(unpacker, &qp_series_ts); /* ts */ qp_next(unpacker, &qp_series_val); /* val */ + siridb_series_ensure_type(series, &qp_series_val); if (series->tp == TP_STRING) { - val->str = qp_is_raw(qp_series_val.tp) ? - strndup(qp_series_val.via.str, qp_series_val.len) : - strdup(""); - + val->str = \ + strndup(qp_series_val.via.str, qp_series_val.len); if (val->str == NULL) { ERR_ALLOC diff --git a/src/siri/db/series.c b/src/siri/db/series.c index 7c5adde1..3989581d 100644 --- a/src/siri/db/series.c +++ b/src/siri/db/series.c @@ -44,6 +44,13 @@ #define BEND series->buffer->points->data[series->buffer->points->len - 1].ts #define DROPPED_DUMMY 1 +/* + * Used for storing double and integers as string. this is not very important + * if it will not store all characters generated so 64 is more than enough + */ +#define STR_TYPE_BUF_SZ 64 +static char str_type_buf[STR_TYPE_BUF_SZ]; + static int SERIES_save(siridb_t * siridb); static int SERIES_load(siridb_t * siridb, imap_t * dropped); static int SERIES_read_dropped(siridb_t * siridb, imap_t * dropped); @@ -154,7 +161,7 @@ int siridb_series_add_point( } else { - if (siridb_buffer_write_last_point(siridb->buffer, series)) + if (siridb_buffer_write_point(siridb->buffer, series, ts, val)) { ERR_FILE log_critical("Cannot write new point to buffer"); @@ -935,6 +942,86 @@ siridb_points_t * siridb_series_get_count(siridb_series_t * series) return points; } +void siridb_series_ensure_type(siridb_series_t * series, qp_obj_t * qp_obj) +{ + switch(series->tp) + { + case TP_INT: + if (qp_obj->tp != QP_INT64) + { + if (qp_obj->tp == QP_DOUBLE) + { + double d = qp_obj->via.real; + qp_obj->via.int64 = (int64_t) d; + } + else if (qp_obj->tp == QP_RAW) + { + char * s = strndup(qp_obj->via.str, qp_obj->len); + qp_obj->via.int64 = \ + (s == NULL) ? 0 : (int64_t) strtoll(s, NULL, 10); + free(s); + } + else + { + assert(0); + } + qp_obj->tp = QP_INT64; + } + return; + case TP_DOUBLE: + if (qp_obj->tp != QP_DOUBLE) + { + if (qp_obj->tp == QP_INT64) + { + int64_t i = qp_obj->via.int64; + qp_obj->via.real = (double) i; + } + else if (qp_obj->tp == QP_RAW) + { + char * s = strndup(qp_obj->via.str, qp_obj->len); + qp_obj->via.real = \ + (s == NULL) ? 0.0 : strtod(s, NULL); + free(s); + } + else + { + assert(0); + } + qp_obj->tp = TP_DOUBLE; + } + return; + case TP_STRING: + if (qp_obj->tp != QP_RAW) + { + if (qp_obj->tp == QP_INT64) + { + snprintf( + str_type_buf, + STR_TYPE_BUF_SZ, + "%" PRId64, + qp_obj->via.int64); + qp_obj->via.str = str_type_buf; + } + else if (qp_obj->tp == QP_DOUBLE) + { + snprintf( + str_type_buf, + STR_TYPE_BUF_SZ, + "%f", + qp_obj->via.real); + qp_obj->via.str = str_type_buf; + } + else + { + assert(0); + } + qp_obj->tp = QP_RAW; + } + return; + } + assert (0); +} + /* * Calculate the server id. * Returns 0 or 1, representing a server in a pool) @@ -1466,6 +1553,7 @@ static int SERIES_load(siridb_t * siridb, imap_t * dropped) siridb_series_t * series; qp_types_t tp; uint32_t series_id; + uint8_t series_tp; /* we should not have any series at this moment */ assert(siridb->max_series_id == 0); @@ -1502,10 +1590,11 @@ static int SERIES_load(siridb_t * siridb, imap_t * dropped) if (imap_get(dropped, series_id) == NULL) { + series_tp = (uint8_t) qp_series_tp.via.int64; series = SERIES_new( siridb, series_id, - (uint8_t) qp_series_tp.via.int64, + series_tp, siridb->server->pool, (const char *) qp_series_name.via.raw); if (series != NULL)